Conversation
f96df50 to
82c5546
Compare
82c5546 to
3000120
Compare
070fba2 to
862274b
Compare
3000120 to
9ee38b4
Compare
|
Pull Request Test Coverage Report for Build 21671598072Details
💛 - Coveralls |
64bfd6d to
ff3da31
Compare
packages/llama-index-workflows/src/workflows/context/internal_context.py
Show resolved
Hide resolved
| @property | ||
| def status(self) -> Status: | ||
| """Get the current status by inspecting the handler state.""" | ||
| if not self.run_handler.done(): | ||
| """Get the current status by inspecting the terminal event or handler state. | ||
|
|
||
| Status is derived from the terminal event type when available: | ||
| - WorkflowCancelledEvent -> "cancelled" | ||
| - WorkflowTimedOutEvent -> "failed" (timeout is a failure mode) | ||
| - WorkflowFailedEvent -> "failed" | ||
| - Plain StopEvent -> "completed" | ||
|
|
||
| Falls back to checking handler state if no terminal event yet. | ||
| """ | ||
| # First check if we have a terminal event - derive status from event type | ||
| if self._terminal_event is not None: | ||
| if isinstance(self._terminal_event, WorkflowCancelledEvent): | ||
| return "cancelled" | ||
| elif isinstance(self._terminal_event, WorkflowTimedOutEvent): | ||
| return "failed" | ||
| elif isinstance(self._terminal_event, WorkflowFailedEvent): | ||
| return "failed" | ||
| else: | ||
| return "completed" | ||
|
|
||
| # Fall back to handler state check if no terminal event yet | ||
| if not self.run_handler.is_done(): | ||
| return "running" | ||
| # done - check if cancelled first | ||
| if self.run_handler.cancelled(): | ||
| return "cancelled" | ||
| # then check for exception | ||
| exc = self.run_handler.exception() | ||
| if exc is not None: | ||
| return "failed" | ||
| return "completed" | ||
| # If handler is done but we don't have a terminal event, it was likely | ||
| # cancelled externally or failed before emitting a terminal event | ||
| return "running" |
There was a problem hiding this comment.
🟡 WorkflowServer status can report "running" even after handler completion if no terminal event was observed
In _WorkflowHandler.status, if no terminal event was recorded and run_handler.is_done() is true, the code returns "running" unconditionally.
Actual behavior: completed/failed/cancelled runs can be reported as "running" in persistence/API if the terminal StopEvent was not observed/recorded by _stream_events for any reason.
Expected behavior: if the handler is done and there is no terminal event, the status should be derived from handler completion state (cancelled vs exception vs completed), not forced to "running".
Code: workflows/server/server.py:1685-1713
Recommendation: When run_handler.is_done() is true and _terminal_event is None, fall back to run_handler.cancelled() / run_handler.exception() to classify as cancelled/failed/completed (or at least "failed"/"cancelled"), rather than returning "running".
Was this helpful? React with 👍 or 👎 to provide feedback.
b231c6d to
2574a33
Compare
| def __init__(self) -> None: | ||
| self._queues: dict[str, AsyncioAdapterQueues] = {} | ||
| self._max_concurrent_runs: weakref.WeakValueDictionary[ | ||
| str, asyncio.Semaphore | ||
| ] = weakref.WeakValueDictionary() |
There was a problem hiding this comment.
🔴 BasicRuntime concurrency limiting can silently stop working due to WeakValueDictionary semaphore storage
BasicRuntime stores per-workflow semaphores in a weakref.WeakValueDictionary:
self._max_concurrent_runs: weakref.WeakValueDictionary[str, asyncio.Semaphore](basic.py:184-188).
Because the only long-lived reference to each semaphore is the weak dictionary entry, semaphores may be garbage-collected at any time when not currently being awaited. When that happens, the next call to _maybe_acquire_max_concurrent_runs() will create a new semaphore, effectively resetting concurrency limits and allowing more than the configured num_concurrent_runs.
Actual: concurrency limit can be bypassed intermittently/non-deterministically.
Expected: concurrency limit should be enforced consistently for the process lifetime (or at least until runtime.destroy()).
Impact: can exceed intended concurrency caps, causing resource exhaustion and incorrect load-shedding behavior.
Recommendation: Use a normal dict (strong refs) for _max_concurrent_runs, and clear it in destroy(); or otherwise keep strong references for semaphore lifetime management.
Was this helpful? React with 👍 or 👎 to provide feedback.
| async def cancel_handlers_and_tasks(self, *, graceful: bool = True) -> None: | ||
| """Cancel the handler and release it from the store. | ||
|
|
||
| Args: | ||
| graceful: If True, request graceful cancellation and wait for | ||
| WorkflowCancelledEvent. If False, force immediate cancellation | ||
| (used for idle release where we don't want to emit cancel event). | ||
| """ | ||
| if not self.run_handler.is_done(): | ||
| if graceful: | ||
| try: | ||
| # Request graceful cancellation - this will emit WorkflowCancelledEvent | ||
| await self.run_handler.cancel_run() | ||
| except Exception: | ||
| pass | ||
| try: | ||
| # Wait for the workflow to complete after cancellation | ||
| # This gives time for WorkflowCancelledEvent to be emitted | ||
| await asyncio.wait_for(self.run_handler, timeout=2.0) | ||
| except asyncio.TimeoutError: | ||
| # Force cancel if graceful cancellation didn't complete in time | ||
| self.run_handler.cancel() | ||
| except asyncio.CancelledError: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| else: | ||
| # Force immediate cancellation without waiting | ||
| try: | ||
| await self.run_handler.cancel_run() | ||
| except Exception: | ||
| pass | ||
| try: | ||
| self.run_handler.cancel() | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
🟡 Idle release uses graceful cancellation even when graceful=False, contradicting intended semantics
_WorkflowHandler.cancel_handlers_and_tasks(graceful=False) is documented/used for idle release “where we don't want to emit cancel event”, but it still calls await self.run_handler.cancel_run() before hard-cancelling.
Actual: idle release sends a TickCancelRun into the workflow (cancel_run()), which can cause the workflow to emit WorkflowCancelledEvent and transition persisted status to cancelled.
Expected: idle release should stop in-memory execution without changing logical workflow outcome (it should remain resumable/running), or at least avoid emitting cancellation signals.
Code:
- In non-graceful branch:
await self.run_handler.cancel_run()
...
self.run_handler.cancel()workflows/server/server.py:1937-1945
Impact: idle release may incorrectly cancel runs instead of just unloading them, breaking resumability and causing incorrect persisted status.
Recommendation: For graceful=False, avoid calling cancel_run(); instead only stop the streaming task and close adapter/resources (or implement a runtime-specific ‘detach/unload’ that doesn’t enqueue TickCancelRun).
Was this helpful? React with 👍 or 👎 to provide feedback.
2b2fe68 to
e31a92d
Compare
3f995dc to
380caf0
Compare
e6ed713 to
3149d66
Compare
7019925 to
9a025f9
Compare
bc769c2 to
1a9de11
Compare
8c4d60a to
30278d7
Compare
69da0b3 to
9a296b1
Compare
53dab5b to
ee2c407
Compare
6631af8 to
67c0fdc
Compare
250d793 to
35194c5
Compare
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
| async with self._lock: | ||
| state, commit_fn = await self._run_sync(_edit_with_lock) | ||
| try: | ||
| yield state | ||
| await self._run_sync(commit_fn, state) | ||
| except Exception: | ||
| raise |
There was a problem hiding this comment.
🔴 Database connection leak when exception raised in edit_state context
When an exception is raised inside the edit_state context manager, the database connection is never closed, causing a resource leak.
Click to expand
How the bug is triggered
_edit_with_lock()opens a connection and begins a transaction (lines 492-493)- It returns
state, commit_fnwherecommit_fncontains the cleanup logic (trans.commit()andconn.close()) - In the async context manager (lines 519-525):
state, commit_fn = await self._run_sync(_edit_with_lock)- connection is openedyield state- user code runs- If user code raises an exception, execution jumps to line 524-525 which just re-raises
commit_fnis never called, so the connection is never closed
Actual vs Expected
Actual: When user code raises an exception:
async with store.edit_state() as state:
state["value"] = "modified"
raise ValueError("intentional error") # Connection leaks!The connection remains open and the transaction is left hanging.
Expected: The connection should be rolled back and closed when an exception occurs.
Impact
Database connection pool exhaustion over time, especially in long-running applications with error conditions. This could cause the application to eventually fail to connect to the database.
Recommendation: Add rollback and connection cleanup in the exception handler:
async with self._lock:
state, commit_fn = await self._run_sync(_edit_with_lock)
try:
yield state
await self._run_sync(commit_fn, state)
except Exception:
# Need to rollback and close the connection
def _rollback_and_close() -> None:
try:
trans.rollback()
finally:
conn.close()
await self._run_sync(_rollback_and_close)
raiseAlternatively, restructure to use a separate cleanup function that's always called, or capture conn and trans in a way that allows cleanup in the except block.
Was this helpful? React with 👍 or 👎 to provide feedback.
35194c5 to
816069d
Compare
816069d to
2439fff
Compare
5624529 to
64861fc
Compare
Uh oh!
There was an error while loading. Please reload this page.